/**
 * Licensed to the Rivulet under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *     webapps/LICENSE-Rivulet-1.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.rivues.task;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;

import com.rivues.core.RivuDataContext;
import com.rivues.module.platform.web.model.JobDetail;

/**
 * @author jaddy0302 Rivulet CrawlTaskJob.java 2010-3-6
 * 
 */
public class CrawlTaskJob {
	private static Logger log = LoggerFactory.getLogger(CrawlTaskJob.class) ;
	private TaskExecutor taskExecutor;
	private AtomicInteger runnintTaskNum = new AtomicInteger() ;
	private int maxRunnintTask = 20 ;	//最大同时执行任务的数量
	private static boolean hasTaskRunning = false ;
	/**
	 * 获取任务信息
	 * @return
	 */
	private synchronized JobDetail getJobDetail(){
		Map<String, Object> jobDetailMap = RivuDataContext.getClusterInstance().get(RivuDataContext.DistributeEventEnum.JOBDETAIL.toString());
		JobDetail jobDetail = null ;
		try{
			/**
			 * 分布式锁
			 */
			Map<String, Object> runningJobMap = RivuDataContext.getClusterInstance().get(RivuDataContext.DistributeEventEnum.RUNNINGJOB.toString());
			Iterator<String> jobKeyIterator = jobDetailMap.keySet().iterator();
			while(jobKeyIterator.hasNext()){
				String jobKey = jobKeyIterator.next() ;
				JobDetail tempJobDetail = (JobDetail) jobDetailMap.get(jobKey) ;
				if(jobDetail==null || tempJobDetail.getPriority()<jobDetail.getPriority()){
					jobDetail = tempJobDetail ;
				}
				if(jobDetail.getPriority() <= 1){
					break ;
				}
			}
			String serverName = RivuDataContext.getServerName() ;
			if(jobDetail!=null && runningJobMap.get(jobDetail.getId())==null && (jobDetail.getRunserver()==null || jobDetail.getRunserver().length() == 0 || jobDetail.getRunserver().equals(serverName))){
				/**
				 * 以下代码启用分布式锁，将 当前加入到执行队列中的任务从等待队列移除，放入到执行队列，然后开始启动线程执行
				 */
				jobDetailMap.remove(jobDetail.getId()) ;
				runningJobMap.put(jobDetail.getId(), jobDetail) ;
			}else{
				jobDetail = null ;
			}
		}catch(Exception ex){
			ex.printStackTrace();
		}
		return jobDetail ;
	}
	/**
	 * @throws ClassNotFoundException
	 * 
	 */
	public void execute() throws ClassNotFoundException {
		JobDetail jobDetail = null ;
		while(!hasTaskRunning && (runnintTaskNum.intValue() < maxRunnintTask) && (jobDetail = getJobDetail()) !=null){
			hasTaskRunning = true ;
			runnintTaskNum.incrementAndGet();//开始执行任务，计数
			/**
			 * 当前主线程中 获取 JobDetail
			 */
			try{
				/**
				 * 将任务放入到本地执行任务队列中，目的是为了在 任务停止的时候能响应到
				 */
				if(RivuDataContext.getLocalRunningJob().get(jobDetail.getId())==null){
					taskExecutor.execute(new Task(jobDetail)) ;
				}
			}catch (Exception e) {
				// TODO: handle exception
				
			}finally{
				hasTaskRunning = false ;
				runnintTaskNum.decrementAndGet();//任务执行结束，计数减一
			}
		}
	}
	
	/**
	 * @return the taskExecutor
	 */
	public TaskExecutor getTaskExecutor() {
		return taskExecutor;
	}

	/**
	 * @param taskExecutor
	 *            the taskExecutor to set
	 */
	public void setTaskExecutor(TaskExecutor taskExecutor) {
		this.taskExecutor = taskExecutor;
	}

	/**
	 * 
	 * @param args
	 */
	public static void main(String[] args) {
		try {
			String text = "未采集到数据 ，请确认采集信息是否正确。";
			for (int i = 0; i < text.length(); i++) {
				System.out.print("&#" + (int) text.charAt(i) + ";");
			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
